{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a5600f8f",
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install daft[aws,ray]\n",
    "!pip install Pillow numpy"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f8c864c4",
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "CI = False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fb3007ac",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "USE_RAY = False"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "0a521469",
   "metadata": {},
   "source": [
    "```{hint}\n",
    "✨✨✨ **Run this notebook on Google Colab** ✨✨✨\n",
    "\n",
    "You can [run this notebook yourself with Google Colab](https://colab.research.google.com/github/Eventual-Inc/Daft/blob/main/tutorials/image_querying/top_n_red_color.ipynb)!\n",
    "```"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "1684139f-5b59-4e24-89d3-7cd67ba6c103",
   "metadata": {},
   "source": [
    "# Top N Most Red Images\n",
    "\n",
    "In this demo we will go through a simple example of using Daft to find the top N most red images out of the OpenImages dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a42aad93-2e85-4392-89e2-f2301550afa4",
   "metadata": {
    "jupyter": {
     "source_hidden": true
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "###\n",
    "# Settings for running on 10,000 rows in a distributed Ray cluster\n",
    "###\n",
    "\n",
    "if USE_RAY:\n",
    "    import ray\n",
    "\n",
    "    import daft.context\n",
    "\n",
    "    # NOTE: Replace with the address to an existing running Ray cluster, or None to start a local Ray cluster\n",
    "    RAY_CLUSTER_ADDRESS = \"ray://localhost:10001\"\n",
    "\n",
    "    ray.init(\n",
    "        address=RAY_CLUSTER_ADDRESS,\n",
    "        runtime_env={\"pip\": [\"daft\", \"pillow\", \"s3fs\"]},\n",
    "    )\n",
    "\n",
    "    daft.context.set_runner_ray(address=RAY_CLUSTER_ADDRESS)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "77aeeb92-e654-468b-b01e-430dd72fe9ea",
   "metadata": {},
   "source": [
    "## Constructing our DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8421c246-ef21-419a-90b6-ffe17b36bd97",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import daft\n",
    "\n",
    "IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))  # Use anonymous S3 access\n",
    "\n",
    "df = daft.from_glob_path(\n",
    "    \"s3://daft-public-data/open-images/validation-images/*\",\n",
    "    io_config=IO_CONFIG,\n",
    ")\n",
    "\n",
    "if USE_RAY:\n",
    "    df = df.limit(10000)\n",
    "    df = df.repartition(64)\n",
    "else:\n",
    "    df = df.limit(100)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f3b2b0d0-b314-4f6a-bf78-326fcec066f9",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "db95e6d8-6b41-4dc1-91c9-5304c16a69df",
   "metadata": {},
   "source": [
    "### Filtering Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "10614260-e736-4b4b-8acd-a7481579e4e9",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df = df.where(df[\"size\"] < 300000)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "bf871b1b-d106-4063-8009-d2debfb790a8",
   "metadata": {},
   "source": [
    "**Daft is LAZY**: this filtering doesn't run immediately, but rather queues operations up in a query plan."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "9a15f564-7da8-4d58-b29c-3cbc39259e8a",
   "metadata": {},
   "source": [
    "Now, let's define another filter with a lower bound for the image size"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b769de9d-8262-4831-9a19-abe1802e4889",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df = df.where(df[\"size\"] > 200000)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "2267d8c2-c33e-45f9-a03d-7f87acaf057d",
   "metadata": {},
   "source": [
    "If we look at the plan, there are now two enqueued `Filter` operations!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1ec3bbc6-1b44-4bbd-ab89-969caca0cb7a",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df.explain()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "633ecf9b-ea58-44a6-b761-66ebb0bc4060",
   "metadata": {},
   "source": [
    "Doing these two `Filter`s one after another is really inefficient since we have to pass through the data twice!\n",
    "\n",
    "Don't worry though - Daft's query optimizer will actually optimize this at runtime and merge the two `Filter` operations into one. You can view the optimized plan with `show_all=True`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6553b3da-8113-4fc4-a993-dbc292780279",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df.explain(show_all=True)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "c7fb87fa-7387-4b11-aa28-58de658c272e",
   "metadata": {},
   "source": [
    "This is just one example of query optimization, and Daft does many other really important ones such as Predicate Pushdowns and Column Pruning to keep your execution plans running efficiently.\n",
    "\n",
    "Now we can **materialize** the filtered dataframe like so:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "eba7fa9b-e4c4-465f-9555-365f95ecb211",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Materializes the dataframe and shows first 10 rows\n",
    "\n",
    "df.collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "2dd014ad-8d14-40eb-883c-89cee2d4f9e6",
   "metadata": {},
   "source": [
    "Note that calling `.collect()` **materializes** the data. It will execute the above plan and all the computed data will be materialized in memory as long as the `df` variable is valid. This means that any subsequent operation on `df` will read from this materialized data instead of computing the entire plan again."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "539db555-f70f-4d7d-8c1f-8af7d3ba0213",
   "metadata": {},
   "source": [
    "Let's prune the columns to just the \"filepath\" column, which is the only one we need at the moment:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fef1c587-60fd-457c-8851-a44ea9a571a1",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df = df.select(\"path\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c40ebaf3-8fa6-4099-95a4-234fec2077b2",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Show doesn't materialize the data, but lets us peek at the first N rows\n",
    "# produced by the current query plan\n",
    "\n",
    "df.show(5)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "0a256a14-9cae-4262-807e-8d4cd2e6f0d3",
   "metadata": {},
   "source": [
    "### Working with Complex Data\n",
    "\n",
    "Now let's do some data manipulation, starting with some simple ones (URLs) and finally images!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "eb776f48-474a-4151-b88b-06b03a0a26bc",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df = df.with_column(\"image\", df[\"path\"].url.download(io_config=IO_CONFIG))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1ee58ddc-d06e-4f92-afee-b6d9c9e46330",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Materialize the dataframe, so that we don't have to hit S3 again for subsequent operations\n",
    "df.collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "2c2f7980-0e8e-40ab-b02b-5f90d0fb3982",
   "metadata": {},
   "source": [
    "To decode the raw bytes we're downloading from the URL into an Image, we can simply use Daft's `.image.decode()` expression:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6851bf9c-478b-4613-a9f2-edbb3cc81f48",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df = df.with_column(\"image\", df[\"image\"].image.decode())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ec2c009a-e1ba-429d-8632-9724cc51bb02",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df.show(5)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "74dd028b",
   "metadata": {},
   "source": [
    "Sometimes you may need to run your own custom Python function. We can do this with the `.apply` Daft expression.\n",
    "\n",
    "Let's run a custom `magic_red_detector` function, which will detect red pixels in our images and present us with a mask!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "82afc506-cf6f-4e9a-b10e-7c7f1aed6b66",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import PIL.Image\n",
    "from PIL import ImageFilter\n",
    "\n",
    "\n",
    "def magic_red_detector(img: np.ndarray) -> PIL.Image.Image:\n",
    "    \"\"\"Gets a new image which is a mask covering all 'red' areas in the image.\"\"\"\n",
    "    img = PIL.Image.fromarray(img)\n",
    "    lower = np.array([245, 100, 100])\n",
    "    upper = np.array([10, 255, 255])\n",
    "    lower_hue, upper_hue = lower[0, np.newaxis, np.newaxis], upper[0, np.newaxis, np.newaxis]\n",
    "    lower_saturation_intensity, upper_saturation_intensity = (\n",
    "        lower[1:, np.newaxis, np.newaxis],\n",
    "        upper[1:, np.newaxis, np.newaxis],\n",
    "    )\n",
    "    hsv = img.convert(\"HSV\")\n",
    "    hsv = np.asarray(hsv).T\n",
    "    mask = np.all(\n",
    "        (hsv[1:, ...] >= lower_saturation_intensity) & (hsv[1:, ...] <= upper_saturation_intensity), axis=0\n",
    "    ) & ((hsv[0, ...] >= lower_hue) | (hsv[0, ...] <= upper_hue))\n",
    "    img = PIL.Image.fromarray(mask.T)\n",
    "    img = img.filter(ImageFilter.ModeFilter(size=5))\n",
    "    return img\n",
    "\n",
    "\n",
    "df = df.with_column(\n",
    "    \"red_mask\",\n",
    "    df[\"image\"].apply(magic_red_detector, return_dtype=daft.DataType.python()),\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "27c8bd0a-2638-467d-80c2-a759033d18b6",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "df.collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "c39b8793",
   "metadata": {},
   "source": [
    "Now to determine how \"red\" an image is, we can simply sum up the number of pixels in the `red_mask` column!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c9b940fd-6645-4034-9282-c46792c819ed",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "\n",
    "\n",
    "def sum_mask(mask: PIL.Image.Image) -> int:\n",
    "    val = np.asarray(mask).sum()\n",
    "    return int(val)\n",
    "\n",
    "\n",
    "df = df.with_column(\n",
    "    \"num_pixels_red\",\n",
    "    df[\"red_mask\"].apply(sum_mask, return_dtype=daft.DataType.int64()),\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7bddede4-bd05-4611-ab9e-74e272397f29",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "df.sort(\"num_pixels_red\", desc=True).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "826eabd2-54e8-4147-a5c6-e97c9d0a9e04",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.18"
  },
  "vscode": {
   "interpreter": {
    "hash": "e5d77f7bd5a748e4f6412a25f9708ab7af36936de941fc795d1a6b75eb2da082"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
